package com.change;

import com.change.config.ApplicationProperties;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableScheduling;

import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;

/**
 * @author tongzhaomei
 * @version 1.0
 * @time 2022-07-15 12-03
 */
@EnableScheduling
@EnableConfigurationProperties({ ApplicationProperties.class })
@SpringBootApplication
@Slf4j
public class AdminApplication {

    private static ApplicationProperties applicationProperties;

    public AdminApplication(ApplicationProperties applicationProperties) {
        this.applicationProperties = applicationProperties;
    }

    public static void main(String[] args) throws PulsarClientException, UnknownHostException {
        SpringApplication.run(AdminApplication.class, args);
        log.info("===>后台管理项目启动完成==>");
        log.info("swagger地址：http://{}{}", Inet4Address.getLocalHost().getHostAddress(), ":8081/doc.html");
//        dockerInvoke();  // 1.python版本在windows系统无法安装
//        tencentInvoke(); // 2.url只能内网访问，外放无法访问
    }
    private static void dockerInvoke() throws PulsarClientException {
        String url = applicationProperties.getPulsar().getUrl();
        String token = applicationProperties.getPulsar().getToken();
        String topicStockMsg = applicationProperties.getPulsar().getTopicStockMsg();
        String subPrice = applicationProperties.getPulsar().getSubPrice();
        PulsarClient client = PulsarClient.builder()
                //ip:port 替换成路由ID，位于【集群管理】接入点列表
                .serviceUrl(url)
                .build();
        System.out.println(">> pulsar client created.");

        //创建生产者
        Producer<byte[]> producer = client.newProducer()
                .topic(topicStockMsg)
                .compressionType(CompressionType.LZ4)
                .sendTimeout(0, TimeUnit.SECONDS)
                .enableBatching(true)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .batchingMaxMessages(1000)
                .maxPendingMessages(1000)
                .blockIfQueueFull(true)
                .roundRobinRouterBatchingPartitionSwitchFrequency(10)
                .batcherBuilder(BatcherBuilder.DEFAULT)
                .create();
        System.out.println(">> pulsar producer created.");

        //生产5条消息
        for (int i = 0; i < 5; i++) {
            String value = "my-sync-message-" + i;
            //发送消息
            MessageId msgId = producer.newMessage().value(value.getBytes()).send();
            System.out.println("发送消息： " + msgId + ",value:" + value);
        }
        //关闭生产者
        producer.close();

/*
        Consumer<byte[]> consumer = client.newConsumer()
                .topic(topicStockMsg)
                .subscriptionName(subPrice)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .negativeAckRedeliveryDelay(60, TimeUnit.SECONDS)
                .receiverQueueSize(1000)
                .subscribe();

        //消费5条消息
        for (int i = 0; i < 5; i++) {
            //接收当前offset对应的一条消息
            Message<byte[]> msg = consumer.receive();
            MessageId msgId = msg.getMessageId();
            String value = new String(msg.getValue());
            System.out.println("接受消息： " + msgId + ",value:" + value);
            //接收到之后必须要ack，否则offset会一直停留在当前消息，无法继续消费
            consumer.acknowledge(msg);
        }

        //关闭消费进程
        consumer.close();*/
        //关闭客户端
        client.close();

    }

    private static void tencentInvoke() throws PulsarClientException {
        String url = applicationProperties.getPulsar().getUrl();
        String token = applicationProperties.getPulsar().getToken();
        String topicStockMsg = applicationProperties.getPulsar().getTopicStockMsg();
        String subPrice = applicationProperties.getPulsar().getSubPrice();
        log.info("VPC 内网接入地址：{}",url);
        log.info("角色密钥：{}",token);
        // 一个Pulsar client对应一个客户端链接
        // 原则上一个进程一个client，尽量避免重复创建，消耗资源
        // 关于客户端和生产消费者的最佳实践，可以参考官方文档 https://cloud.tencent.com/document/product/1179/58090
        PulsarClient client = PulsarClient.builder()
                //ip:port 替换成路由ID，位于【集群管理】接入点列表
                .serviceUrl(url)
                //替换成角色密钥，位于【角色管理】页面
                .authentication(AuthenticationFactory.token(token))
                .build();
        System.out.println(">> pulsar client created.");

        //创建消费者
        Consumer<byte[]> consumer = client.newConsumer()
                //topic完整路径，格式为persistent://集群（租户）ID/命名空间/Topic名称，从【Topic管理】处复制
                .topic(topicStockMsg)
                //需要在控制台Topic详情页创建好一个订阅，此处填写订阅名
                .subscriptionName(subPrice)
                //声明消费模式为exclusive（独占）模式
                .subscriptionType(SubscriptionType.Exclusive)
                //配置从最早开始消费，否则可能会消费不到历史消息
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();
        System.out.println(">> pulsar consumer created.");

        //创建生产者
        Producer<byte[]> producer = client.newProducer()
                //topic完整路径，格式为persistent://集群（租户）ID/命名空间/Topic名称
                .topic(topicStockMsg)
                .create();
        System.out.println(">> pulsar producer created.");

        //生产5条消息
        for (int i = 0; i < 5; i++) {
            String value = "my-sync-message-" + i;
            //发送消息
            MessageId msgId = producer.newMessage().value(value.getBytes()).send();
            System.out.println("deliver msg " + msgId + ",value:" + value);
        }
        //关闭生产者
        producer.close();

        //消费5条消息
        for (int i = 0; i < 5; i++) {
            //接收当前offset对应的一条消息
            Message<byte[]> msg = consumer.receive();
            MessageId msgId = msg.getMessageId();
            String value = new String(msg.getValue());
            System.out.println("receive msg " + msgId + ",value:" + value);
            //接收到之后必须要ack，否则offset会一直停留在当前消息，无法继续消费
            consumer.acknowledge(msg);
        }

        //关闭消费进程
        consumer.close();
        //关闭客户端
        client.close();
    }
}
